Skip to content

Conversation

@nyaapa
Copy link
Contributor

@nyaapa nyaapa commented Nov 18, 2025

What changes were proposed in this pull request?

  • group multiple keys into one arrow batch;
    generally will have much less batches in case of high keys cardinality.
  • do not group init_data and input_data in batch0: instead of it serialize init_data first, and then input_data;
    in worst case we're going to have one more chunk by not grouping them together, but winning by having much simpler logic on python side.
  • do not create extra dataframes if not needed + copy empty one;

Why are the changes needed?

Benchmark results show that in high-cardinality scenarios, this optimization improves batch0 time by ~40%. No visible regressions for low cardinality case.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing UT and Benchmark:

10,000,000 distinct keys in init state (8xi3.4xlarge):
- Without Optimization: 11400 records/s
- With Optimization: 30000 records/s

Was this patch authored or co-authored using generative AI tooling?

No

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just started taking a look, it's been a hot minute since I looked at the Arrow serialization logic so some perhaps silly questions.


# Check if the entire column is null
if data_column.null_count == len(data_column):
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we've changed the implicit type signature of the function lets maybe add a type annotation on generate_data_batches for readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


parsed_offsets = extract_key_value_indexes(arg_offsets)

import pandas as pd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random place for an import

init_rows.append(init_row)

total_len = len(input_rows) + len(init_rows)
if total_len >= self.arrow_max_records_per_batch:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SQLConf config param says if set to zero or negative number there is no limit, in this case if it's set to zero or a negative number we will always output a fresh batch per row. Let's change the behaviour and add a test covering this.

Copy link
Contributor Author

@nyaapa nyaapa Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, right;
copied that from non-init state handling; 🫠
nice catch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 1994 to 2005
def row_stream():
for batch in batches:
if self.arrow_max_bytes_per_batch != 2**31 - 1 and batch.num_rows > 0:
batch_bytes = sum(
buf.size
for col in batch.columns
for buf in col.buffers()
if buf is not None
)
self.total_bytes += batch_bytes
self.total_rows += batch.num_rows
self.average_arrow_row_size = self.total_bytes / self.total_rows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems to be duplicated from elsewhere in the file, maybe we can add it to a base class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants